package com.atuguigu.flink.Day05;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

// 实时对账，双流join
// 互相等待5s，5s之后匹配不上，就视为对账失败
public class Example3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<Event> payStream = env
                .fromElements(
                        new Event("order1", "pay1", 1000L),
                        new Event("order2", "pay1", 8000L)

                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timeStamp;
                                    }
                                })
                );
        SingleOutputStreamOperator<Event> weixinStream = env.fromElements(
                new Event("order1", "weixin2", 3000L),
                new Event("order4", "weixin2", 4000L)
        )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timeStamp;
                                    }
                                })
                );

        payStream
                .keyBy(r->r.orderId)
                .connect(weixinStream.keyBy(r->r.orderId))
                .process(new MatchFunction())
                .print();

        env.execute();

    }

    public static class MatchFunction extends CoProcessFunction<Event,Event,String> {
        private ValueState<Event> payEvent;
        private ValueState<Event> weixinEvent;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            payEvent=getRuntimeContext().getState(new ValueStateDescriptor<Event>("pay", Types.POJO(Event.class)));
            weixinEvent=getRuntimeContext().getState(new ValueStateDescriptor<Event>("weixin",Types.POJO(Event.class)));
        }

        @Override
        public void processElement1(Event value, Context ctx, Collector<String> out) throws Exception {
       /*     //微信事件到了
            if(weixinEvent.value() != null){
                out.collect("对账成功,订单1是" + value.orderId);
               // weixinEvent.clear();
            }else {
                //微信未到
                payEvent.update(value);
                ctx.timerService().registerEventTimeTimer(value.timeStamp +5000L);
            }*/
       out.collect("1 " +value);

        }

        @Override
        public void processElement2(Event value, Context ctx, Collector<String> out) throws Exception {
      /*      if(payEvent.value() != null){
                out.collect("对账成功,订单0是" + value.orderId);
                //payEvent.clear();
            }else {
                //pay未到
                weixinEvent.update(value);
                ctx.timerService().registerEventTimeTimer(value.timeStamp + 5000L);
            }*/
        out.collect("2 "+value);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            if(payEvent.value() != null){
                out.collect("对账失败,微信订单" +payEvent.value().orderId +"事件未到");
                payEvent.clear();

            }

            if(weixinEvent.value() != null){
                out.collect("对账失败,支付订单" +weixinEvent.value().orderId +"事件未到");
                weixinEvent.clear();
            }
        }
    }

    //pojo类
    public static class  Event{
        public String orderId;
        public String eventType;
        public Long timeStamp;

        public Event() {
        }

        public Event(String orderId, String eventType, Long timeStamp) {
            this.orderId = orderId;
            this.eventType = eventType;
            this.timeStamp = timeStamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "orderId='" + orderId + '\'' +
                    ", eventType='" + eventType + '\'' +
                    ", timeStamp=" + new Timestamp(timeStamp) +
                    '}';
        }
    }
}
